package com.atguigu.flink.sql.connector;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * Created by Smexy on 2023/4/11
 */
public class Demo1_FileRead
{
    public static void main(String[] args) {

        //设置表的运行环境
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();

        //不借助流进行操作，直接读取外部数据源，映射为表
        TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);

        /*
                自己建表
                    'connector' = 'filesystem' : 读取文件系统中的数据
                    path: 读取的文件的路径
                    format： 文件格式

                 元数据信息，也可以通过一列来获取，但是必须声明 METADATA，必须写在列的最后。
         */
        String createTableSql = " create table t1 ( id STRING, ts BIGINT , vc INT ," +
            "  `file.path` STRING NOT NULL METADATA " +
            "    )WITH (" +
            "  'connector' = 'filesystem'," +
            "  'path' = 'data/sensor.json'," +
            "  'format' = 'json'" +
            ") ";

        //建表
        tableEnvironment.executeSql(createTableSql);

        //查询
        tableEnvironment.sqlQuery(" select * from t1  ").execute().print();

    }
}
